/***********************************************************************
*                                                                      *
*               This software is part of the ast package               *
*          Copyright (c) 1990-2011 AT&T Intellectual Property          *
*                      and is licensed under the                       *
*                 Eclipse Public License, Version 1.0                  *
*                    by AT&T Intellectual Property                     *
*                                                                      *
*                A copy of the License is available at                 *
*          http://www.eclipse.org/org/documents/epl-v10.html           *
*         (with md5 checksum b35adb5213ca9657e911e9befb180842)         *
*                                                                      *
*              Information and Software Systems Research               *
*                            AT&T Research                             *
*                           Florham Park NJ                            *
*                                                                      *
*                 Glenn Fowler <gsf@research.att.com>                  *
*                                                                      *
***********************************************************************/
#pragma prototyped
/*
 * Glenn Fowler
 * AT&T Bell Laboratories
 *
 * remote coshell server main
 * except for history this program would
 * have been called shmux (with a TeX x)
 */

#include "service.h"

#include <namval.h>

static const char usage[] =
"[-?\n@(#)$Id: coshell (AT&T Research) 2007-10-23 $\n]"
USAGE_LICENSE
"[+NAME?coshell - network shell coprocess server]"
"[+DESCRIPTION?\bcoshell\b is a local network shell coprocess server for "
    "programs using \bcoshell\b(3). There is one \bcoshell\b server per "
    "user. This server runs as a daemon on the user's home host, and only "
    "processes running on the home host have access to the server. The "
    "server controls a background \bksh\b(1) shell process, initiated by "
    "\brsh\b(1) or \bssh\b(1), on each of the connected hosts. The "
    "environment of the local host shell is inherited from the server "
    "whereas the environment of remote shells is initialized by \b.profile\b "
    "and \b$ENV\b. The shells run with the \bksh\b \b--bgnice\b and "
    "\b--monitor\b options.]"
"[+?Job requests are accepted from user processes on the local host and "
    "are executed on the connected hosts. \bstdout\b, \bstderr\b, \bFPATH\b, "
    "\bNPROC\b (see ENVIRONMENT), \bPWD\b, \bPATH\b, \bVPATH\b, \bvpath\b, "
    "\bumask\b and the environment variables listed in \bCOEXPORT\b (see "
    "ENVIRONMENT) are set to match requesting user values. \bstdin\b is set "
    "to \v/dev/null\v; \bcoshell\b does not directly support interactive "
    "jobs. Job scheduling is based on load and idle time information "
    "generated by the \bss\b(1) system status daemon. This information is "
    "updated every 60 seconds on average.]"
"[+?The server is started by running \acoshell +\a. The command exits "
    "after a child server process is forked in the background. The optional "
    "\ainfo\a arguments name files containing local network host information "
    "which may be generated from two shell scripts \bgenlocal\b and "
    "\bgenshare\b under the subdirectory bin of the installation root "
    "directory. If no files are specified then the default \alocal\a is "
    "used. The local network is comprised of hosts sharing the same file "
    "name space.]"
"[+?Attributes used by \bcoshell\b can be categorized into two types, "
    "global and host-specific. The global attributes control \bcoshell\b and "
    "are not associated with any particular host. Attribute value pairs, not "
    "including readonly ones, may be specified in the local network host "
    "information files, in \bCOATTRIBUTES \b (see ENVIRONMENT) or may be "
    "set/added using \acoshell -a\a, and may be referenced in an expression "
    "in \bCOATTRIBUTES\b. Attribute names must match "
"[a-zA-Z_]][a-zA-Z_0-9]]*. In the following description on these "
    "attributes, \ahost\a may be an actual host name or a comma separated "
    "list of attribute value pairs specified in \bCOATTRIBUTES\b.]"

"\n"
"\n+ | - | -\acommand\a [ arg ... ]\n"
"\n"

"[+SEE ALSO?\bnmake\b(1), \bksh\b(1), \bss\b(1)]"
;

#ifndef PATHCHECK
#define PATHCHECK	CO_ID
#endif

#define CO_OPT_COMMAND	"command"
#define CO_OPT_DUP	"dup"
#define CO_OPT_HOME	"home"

static char*		coshell[] =	/* shell to run on other side	*/
{
	"3d", "ksh", "sh"
};

State_t		state;			/* program state		*/

/*
 * initialize the server state
 */

static void*
init(void* handle, int fdmax)
{
	register int	n;
	register char*	s;

	NoP(handle);
	message((-1, "init pid=%d", getpid()));
	state.clock = state.start = state.toss = cs.time;
	for (n = 0; n < 10; n++) TOSS;
	state.fdtotal = fdmax;
	if (!(state.con = newof(0, Connection_t, state.fdtotal, 0)))
		error(3, "out of space [con]");
	state.con[0].type = POLL;
	if ((n = getgroups(0, NiL)) <= 0)
		n = NGROUPS_MAX;
	if (!(state.gids = newof(0, gid_t, n + 2, 0)))
		error(3, "out of space [gids]");
	if ((n = getgroups(n, state.gids + 1)) < 0)
		n = 0;
	state.gids[n + 1] = -1;
	state.gids[0] = getegid();
	state.uid = geteuid();
	n = state.fdtotal / 2;
	if (!(state.job = state.jobnext = newof(0, Cojob_t, n, 0)))
		error(3, "out of space [job]");
	state.jobmax = state.jobnext += n - 1;

	/*
	 * initialze the shell table
	 */

	state.busy = BUSY;
	state.grace = GRACE;
	state.maxidle = INT_MAX;
	state.pool = ((s = getenv(CO_ENV_PROC)) && *s) ? (int)strtol(s, NiL, 0) : POOL;
	state.profile = strdup("{ . ./.profile; eval test -f \\$ENV \\&\\& . \\$ENV; } >/dev/null 2>&1 </dev/null");
	if (!(state.home = search(DEF|NEW, csname(0), NiL, NiL)))
		error(3, "cannot get local host address");
	state.shell = state.shellnext = state.home;
	message((-1, "local name is %s", state.home->name));

	/*
	 * load the local net configuration
	 */

	do info(DEF|NEW, *state.argv); while (*state.argv && *++state.argv);

	/*
	 * load the access controls if any
	 */

	info(SET, NiL);

	/*
	 * set the job limits
	 */

	if (state.perserver <= 0 || state.perserver > n)
		state.perserver = n;
	if (state.peruser <= 0 && (state.peruser = (3 * (state.pool - 1)) / 2) <= 0)
		state.peruser = 1;
	if (state.percpu <= 0 && (state.percpu = state.peruser / 4) <= 2)
		state.percpu = 2;

	/*
	 * bias the local host so it can generate more work
	 */

	if (!state.indirect.con && state.home->idle)
	{
		state.home->idle = 0;
		if (!(state.home->flags & SETBIAS)) state.home->bias *= 4;
	}

	/*
	 * get the shell path
	 */

	if (!(s = state.sh))
	{
		s = state.buf;
		for (n = 0;;)
		{
			if (pathpath(coshell[n], NiL, PATH_ABSOLUTE|PATH_REGULAR|PATH_EXECUTE, s, state.buflen))
				break;
			if (++n >= elementsof(coshell))
				error(3, "shell not found");
		}
	}

	/*
	 * parameterize the shell path name on state.home->type
	 */

	pathrepl(s, 0, state.home->type, "%s");
	if (!(state.sh = strdup(s)))
		error(3, "out of space [shell path]");
	message((-1, "parameterized shell path is %s", state.sh));

	/*
	 * initialize the remote shell path
	 */

	if (!(state.remote = strdup(CS_REMOTE_SHELL)))
		error(3, "out of space [remote shell path]");
	message((-1, "remote shell path is %s", state.remote));

	/*
	 * set up the mesg and pump connect streams
	 */

	state.mesg = stream(MESG, "mesg");
	state.pump = stream(PUMP, "pump");

	/*
	 * open the local shell
	 */

	shellopen(state.home, 2);
	return((void*)&state);
}

/*
 * accept a new user connection
 * no id checks since we can trust the connect stream path access
 */

static int
user(void* handle, int fd, Cs_id_t* id, int clone, char** argv)
{
	NoP(handle);
	NoP(id);
	NoP(clone);
	NoP(argv);
	if (state.indirect.con)
	{
		state.con[fd].type = INIT;
		state.con[fd].info.user.fds[0] = fd;
		state.con[fd].info.user.fds[1] = -1;
		state.con[fd].info.user.fds[2] = -1;
		if (cswrite(fd, "#00000\n", 7) != 7) return(-1);
	}
	else state.con[fd].type = IDENT;
	state.con[fd].info.user.flags = USER_IDENT;
	state.con[fd].info.user.home = 0;
	state.con[fd].info.user.pump = 0;
	state.con[fd].info.user.expr = 0;
	return(0);
}

#ifdef O_NONBLOCK

static int
nonblocking(int fd)
{
	int	flags;

	if ((flags = fcntl(fd, F_GETFL)) < 0)
		return flags;
	return fcntl(fd, F_SETFL, flags|O_NONBLOCK);
}

#else

#define nonblocking(f)

#endif

/*
 * service a read event
 */

static int
service(void* handle, register int fd)
{
	register int		n;
	register int		i;
	char*			s;
	char*			t;
	char*			x;
	char*			e;
	int			n1;
	int			n2;
	Coshell_t*		sp;
	Cojob_t*		jp;
	Cs_id_t			id;
	struct stat		st;
	struct stat		ts;
	int			fds[5];
	char			cmd[256];

	NoP(handle);
	switch (state.con[fd].type)
	{
	case ANON:
	case SHELL:
#ifdef O_NONBLOCK
		if (state.con[fd].flags >= 0 && fcntl(fd, F_SETFL, state.con[fd].flags|O_NONBLOCK) < 0)
			state.con[fd].flags = -1;
#endif
		n = csread(fd, s = state.buf, state.buflen, CS_LINE);
#ifdef O_NONBLOCK
		if (state.con[fd].flags >= 0 && fcntl(fd, F_SETFL, state.con[fd].flags) < 0)
			state.con[fd].flags = -1;
#endif
		if (n <= 0)
		{
#ifdef O_NONBLOCK
			if (n < 0 && errno == EAGAIN)
				state.con[fd].error++;
			else
#endif
			drop(fd);
			break;
		}
		s[n - 1] = 0;

		/*
		 * parse the message(s)
		 */

		sp = state.con[fd].info.shell;
		do
		{
			if (x = strchr(s, '\n')) *x++ = 0;
			while (isspace(*s)) s++;
			if (!(i = *s++)) continue;
			message((-3, "%s-message: %s", sp ? sp->name : "", s - 1));
			while (isspace(*s)) s++;
			if ((jp = state.job + (int)strtol(s, NiL, 0)) > state.jobmax) continue;
			while (*s && !isspace(*s)) s++;
			while (isspace(*s)) s++;

			/*
			 * now interpret the message
			 */

			switch (i)
			{
			case 'j':
				/*
				 * <s> is the job pid
				 */

				if (!sp) break;
				i = jp->pid;
				jp->pid = (int)strtol(s, NiL, 0);
				if (i == WARP) goto nuke;
				if (jp->cmd)
				{
					free(jp->cmd);
					jp->cmd = 0;
					state.jobwait--;
				}
				break;
			case 'n':
				/*
				 * <s> is the name of the anonymous shell and its pid
				 */

				if (!sp && (sp = search(GET, s, NiL, NiL)))
				{
					if (sp->fd < 0)
					{
						/*
						 * nuke the zombie that kicked this shell
						 */

						if (sp->fd != -1) waitpid(-sp->fd, NiL, 0);
						state.con[fd].type = SHELL;
						state.con[fd].info.shell = sp;
						while (*s && !isspace(*s)) s++;
						while (isspace(*s)) s++;
						sp->pid = (int)strtol(s, NiL, 0);
						sp->fd = fd;
						sp->open++;
						state.shellwait--;
						state.shells++;
					}
					else
					{
						sp = 0;
						drop(fd);
					}
				}
				break;
			case 'r':
				/*
				 * <s> is the shell rating
				 */

				if (!sp) break;
				sfsprintf(cmd, sizeof(cmd), "%s,%s", s, sp->name);
				search(DEF, cmd, NiL, NiL);
				break;
			case 'x':
				/*
				 * <s> is the job exit code and user,sys times
				 */

				if (!sp) break;
				jp->status = strtol(s, &t, 10);
				jp->sig = 0;
				for (;;)
				{
					if (t <= s) break;
					for (s = t; isalpha(*s) || isspace(*s); s++);
					jp->user += strelapsed(s, &t, CO_QUANT);
					if (t <= s) break;
					for (s = t; isalpha(*s) || isspace(*s); s++);
					jp->sys += strelapsed(s, &t, CO_QUANT);
				}
				if (jp->pid == START)
				{
					if (jp->cmd)
					{
						free(jp->cmd);
						jp->cmd = 0;
						state.jobwait--;
					}
					jp->pid = WARP;
				}
				else
				{
				nuke:
					/*
					 * nuke the zombies
					 */

					sfsprintf(cmd, sizeof(cmd), "wait %d\n", jp->pid);
					cswrite(fd, cmd, strlen(cmd));
					if (--jp->ref <= 0) jobdone(jp);
				}
				break;
			}
		} while (s = x);
		if (sp && sp->running)
			jobcheck(sp);
		break;
	case DEST:
		if (csread(fd, s = state.buf, 13, CS_EXACT) != 13 || s[0] != '#' || (jp = state.job + (int)strtol(s + 1, NiL, 10)) > state.jobmax || fstat(state.con[fd].info.pass.fd = (int)strtol(s + 7, NiL, 10), &st))
			drop(fd);
		else
		{
			state.con[fd].type = PASS;
			state.con[fd].info.pass.job = jp->pid ? jp : 0;
			state.con[fd].info.pass.serialize = (jp->flags & CO_SERIALIZE) ? sfstropen() : (Sfio_t*)0;
		}
		break;
	case IDENT:
		if ((i = csrecv(fd, &id, fds, elementsof(fds))) < 0) /* error */;
		else if (i < 3) errno = EBADF;
		else if (cswrite(fd, "#00000\n", 7) != 7) /* error */;
		else
		{
			if (i == 3)
			{
				close(fds[0]);
				fds[0] = n = fd;
				state.con[n].info.user.flags = 0;
			}
			else
			{
				drop(fd);
				csfd(n = fds[0], CS_POLL_READ);
				fds[0] = fds[3];
				state.con[n].info.user.flags = USER_IDENT;
				state.con[n].info.user.home = 0;
				state.con[n].info.user.pump = 0;
			}
			state.con[fds[0]].type = UCMD;
			state.con[fds[1]].type = UOUT;
			state.con[fds[2]].type = UERR;
			state.con[n].type = INIT;
			state.con[n].info.user.pid = id.pid;
			for (i = 0; i < elementsof(state.con[n].info.user.fds); i++)
			{
				fcntl(fds[i], F_SETFD, FD_CLOEXEC);
				state.con[n].info.user.fds[i] = fds[i];
			}
			break;
		}
		sfsprintf(state.buf, state.buflen, "#%05d\n", errno);
		cswrite(fd, state.buf, 7);
		drop(fd);
		while (--i >= 0)
			close(fds[i]);
		break;
	case INIT:
		if (csread(fd, s = cmd, 7, CS_EXACT) != 7 || s[0] != '#' || (i = (int)strtol(s + 1, NiL, 10)) && (i < 0 || i > state.buflen || csread(fd, state.buf, i, CS_EXACT) != i))
		{
			drop(fd);
			break;
		}
		state.con[fd].info.user.attr.label[0] = 0;
		x = 0;
		if (i)
		{
			s = state.buf;
			s[i] = 0;
			while (e = strchr(s, '\n'))
			{
				*e++ = 0;
				if (strneq(s, CO_ENV_ATTRIBUTES, sizeof(CO_ENV_ATTRIBUTES) - 1) && s[sizeof(CO_ENV_ATTRIBUTES) - 1] == '=')
				{
					x = s + sizeof(CO_ENV_ATTRIBUTES);
					if (*x == '\'')
					{
						i = 1;
						s = t = ++x;
						while (*s = *t++)
						{
							if (*s == '\'') i = !i;
							else if (i || *s != '\\') s++;
							else if (!(*s++ = *t++)) break;
						}
					}
				}
				else if (state.indirect.con)
				{
					if (streq(s, CO_OPT_COMMAND))
						state.con[fd].info.user.flags &= ~USER_IDENT;
					else if (streq(s, CO_OPT_DUP))
						state.con[fd].info.user.flags |= USER_DUP;
					else if (strneq(s, CO_OPT_HOME, sizeof(CO_OPT_HOME) - 1) && s[sizeof(CO_OPT_HOME) - 1] == '=' && (sp = search(GET, s + sizeof(CO_OPT_HOME), NiL, NiL)))
						(state.con[fd].info.user.home = sp)->home++;
					else if (strneq(s, CO_OPT_INDIRECT, sizeof(CO_OPT_INDIRECT) - 1) && s[sizeof(CO_OPT_INDIRECT) - 1] == '=')
						state.con[fd].info.user.pump = strdup(s + sizeof(CO_OPT_INDIRECT));
				}
				s = e;
			}
		}
		if (state.indirect.con && (state.con[fd].info.user.flags & (USER_IDENT|USER_INIT)) == USER_IDENT)
		{
			state.con[fd].info.user.flags |= USER_INIT;
			break;
		}
		if (x) state.con[fd].info.user.expr = strdup(x);
		attributes(x, &state.con[fd].info.user.attr, NiL);
		state.con[fd].info.user.attr.set &= ~SETLABEL;
		if (state.indirect.con)
		{
			if (!state.con[fd].info.user.pump ||
				(state.con[fd].info.user.fds[1] = csopen(state.con[fd].info.user.pump, 0)) < 0 ||
				cswrite(state.con[fd].info.user.fds[1], "#00001\n", 7) != 7)
			{
				drop(fd);
				break;
			}
			else
			{
				nonblocking(state.con[fd].info.user.fds[1]);
				state.con[state.con[fd].info.user.fds[1]].type = UOUT;
			}
			if (state.con[fd].info.user.flags & USER_DUP) state.con[fd].info.user.fds[2] = state.con[fd].info.user.fds[1];
			else if ((state.con[fd].info.user.fds[2] = csopen(state.con[fd].info.user.pump, 0)) < 0 ||
				cswrite(state.con[fd].info.user.fds[2], "#00002\n", 7) != 7)
			{
				drop(fd);
				break;
			}
			else state.con[state.con[fd].info.user.fds[2]].type = UERR;
		}
		else if (!fstat(state.con[fd].info.user.fds[1], &st) && !fstat(state.con[fd].info.user.fds[2], &ts) && st.st_ino == ts.st_ino && st.st_dev == ts.st_dev)
		{
			drop(state.con[fd].info.user.fds[2]);
			state.con[fd].info.user.fds[2] = state.con[fd].info.user.fds[1];
		}
		if (state.con[fd].info.user.flags & USER_IDENT)
		{
			s = state.buf;
			s += sfsprintf(s, state.buflen - (s - state.buf), "%s,%s", CO_OPT_SERVER, CO_OPT_ACK);
			if (state.indirect.con) s += sfsprintf(s, state.buflen - (s - state.buf), ",%s", CO_OPT_INDIRECT);
			s += sfsprintf(s, state.buflen - (s - state.buf), "\n");
			i = s - state.buf;
			if (cswrite(state.con[fd].info.user.fds[0], state.buf, i) != i)
			{
				drop(fd);
				break;
			}
		}
		state.con[fd].info.user.running = 0;
		state.con[fd].info.user.total = 0;
		state.con[fd].type = USER;
		state.users++;
		break;
	case MESG:
		if (csrecv(fd, &id, fds, 1) == 1)
		{
			i = fds[0];
			n = strlen(corinit);
			if (cswrite(i, corinit, n) == n)
			{
				state.con[i].type = ANON;
				state.con[i].info.shell = 0;
#ifdef O_NONBLOCK
				state.con[i].flags = fcntl(fd, F_GETFL, 0);
#endif
				csfd(i, CS_POLL_READ);
			}
			else close(i);
		}
		break;
	case PASS:
		if ((i = read(fd, state.buf, state.buflen)) <= 0)
			drop(fd);
		else
		{	
			if (state.identify)
			{
				n = state.con[fd].info.pass.fd;
				jp = state.con[fd].info.pass.job;
				if (state.con[n].info.ident.shell != jp->shell || state.con[n].info.ident.pid != jp->pid)
				{
					state.con[n].info.ident.shell = jp->shell;
					state.con[n].info.ident.pid = jp->pid;
					sfprintf(state.string, "%s", jp->shell->name);
					if (*(s = state.con[jp->fd].info.user.attr.label))
						sfprintf(state.string, " %s", s);
					if (*(s = state.con[fd].info.pass.job->label))
						sfprintf(state.string, " %s", s);
					else sfprintf(state.string, " %d", jp->pid);
					if (!(s = sfstruse(state.string)))
						error(3, "out of space");
					n = sfsprintf(cmd, sizeof(cmd) - 1, state.identify, s);
					cswrite(state.con[fd].info.pass.fd, cmd, n);
				}
			}
			if (state.con[fd].info.pass.serialize)
			{
				if (sfwrite(state.con[fd].info.pass.serialize, state.buf, i) != i)
					drop(fd);
			}
			else if (cswrite(state.con[fd].info.pass.fd, state.buf, i) != i)
				drop(fd);
		}
		break;
	case PUMP:
		if (csrecv(fd, &id, fds, 1) == 1)
		{
			i = fds[0];
			state.con[i].type = DEST;
			csfd(i, CS_POLL_READ);
		}
		break;
	case SCHED:
		if ((i = read(fd, state.buf, state.buflen)) <= 0)
			drop(fd);
		/*HERE*/
		break;
	case USER:
		if (csread(fd, cmd, 7, CS_EXACT) != 7 || cmd[0] != '#' || (n = (int)strtol(cmd + 1, NiL, 10)) <= 0)
		{
			drop(fd);
			break;
		}
		if (n > state.buflen)
		{
			state.buflen = roundof(n, CHUNK);
			if (!(state.buf = newof(state.buf, char, state.buflen, 0)))
				error(3, "out of space [buf]");
		}
		if (csread(fd, state.buf, n, CS_EXACT) != n)
		{
			drop(fd);
			break;
		}
		state.buf[n - 1] = 0;
		state.cmds++;
		n = error_info.errors;
		if (!(state.home = state.con[fd].info.user.home))
			state.home = state.shell;
		switch (i = *state.buf)
		{
		case 'e':
		case 'E':
			shellexec(NiL, state.buf, fd);
			n = error_info.errors;
			break;
		case 'k':
		case 'K':
			if (tokscan(state.buf, NiL, "%s %d %d ", NiL, &n1, &n2) != 3)
				error_info.errors++;
			else
			{
				message((-1, "kill state.con=%d rid=%d sig=%d", fd, n1, n2));
				for (jp = state.job; jp <= state.jobmax; jp++)
					if (jp->fd == fd && jp->pid && (!n1 || jp->rid == n1))
					{
						jobkill(jp, n2);
						if (n1) break;
					}
				n = error_info.errors;
			}
			break;
		case 's':
		case 'S':
			if (tokscan(state.buf, NiL, "%s %s %s %d %s", NiL, &s, &t, &n1, &x) != 5)
				error_info.errors++;
			else server(fd, *s, *t, n1, x);
			break;
		default:
			error(ERROR_OUTPUT|2, state.con[fd].info.user.fds[2], "%c: unknown op", i);
			break;
		}
		state.home = state.shell;
		if (isupper(i))
		{
			n = sfsprintf(state.buf, state.buflen, "a 1 %d\n", error_info.errors != n);
			cswrite(state.con[fd].info.user.fds[0], state.buf, n);
		}
		break;
	}
	return(0);
}

/*
 * wake up to check for hung shells and jobs on busy hosts
 */

static int
wakeup(void* handle)
{
	NoP(handle);
	shellcheck();
	jobcheck(NiL);
	return(0);
}

/*
 * indirect coshell initialization
 */

static void*
indirect(void* handle, int fdmax)
{
	int*	pass;

	NoP(handle);
	if (!(pass = newof(0, int, fdmax, 0)))
		error(3, "out of space [pass]");
	csfd(state.indirect.con, CS_POLL_READ);
	pass[state.indirect.con] = state.indirect.msg;
	csfd(state.indirect.cmd, CS_POLL_READ);
	pass[state.indirect.cmd] = state.indirect.con;
	return((void*)pass);
}

/*
 * indirect coshell data pump
 */

static int
pump(void* handle, register int fd)
{
	register int	n;
	register int	pd;
	register int*	pass = (int*)handle + fd;
	register char*	s = state.buf;

	if ((n = read(fd, s, state.buflen)) <= 0) goto drop;
	if (!(pd = *pass))
	{
		if ((n -= 7) < 0 || s[0] != '#' || (pd = (int)strtol(s + 1, NiL, 10)) < 1 || pd > 2) goto drop;
		*pass = pd == 1 ? state.indirect.out : state.indirect.err;
		if (!n) return(0);
	}
	if (cswrite(pd, s, n) != n) goto drop;
	return(0);
 drop:
	if (fd == state.indirect.cmd) exit(0);
	if (fd = *pass)
	{
		close(fd);
		*pass = 0;
	}
	return(-1);
}

/*
 * coshell main
 */

int
main(int argc, char** argv)
{
	register int		n;
	register int		fd;
	register int		i;
	int			pfd;
	int			d;
	char*			s;
	char*			t;
	int			fds[4];

	NoP(argc);
	setlocale(LC_ALL, "");
	opt_info.argv = argv;
	error_info.id = CO_ID;
	if (pathcheck(PATHCHECK, error_info.id, &state.check))
		exit(1);
	state.version = strdup(fmtident(usage));
	error(-1, "%s", state.version);
	if (!(state.string = sfstropen()))
		error(3, "out of space [string]");
	state.buflen = 8 * CHUNK;
	if (!(state.buf = newof(0, char, state.buflen, 0)))
		error(3, "out of space [buf]");

	/*
	 * optget() for self documentation
	 * "options" passed to the server as commands
	 */

	while ((s = argv[opt_info.index+1]) && s[0] == '-' && (s[1] == '-' || s[1] == '?'))
	{
		switch (optget(argv, usage))
		{
		case ':':
			error(2, "%s", opt_info.arg);
			break;
		case '?':
			error(ERROR_USAGE|4, "%s", opt_info.arg);
			break;
		}
		break;
	}
	argv += opt_info.index;
	if (error_info.errors)
		error(ERROR_USAGE|4, "%s", optusage(NiL));

	/*
	 * check for alternate connect stream
	 */

	if (*argv && (s = *++argv) && strmatch(s, "/dev/(fdp|tcp)/*")) argv++;
	else if (!(t = getenv(CO_ENV_SHELL)) || tokscan(t, NiL, " %s %s ", NiL, &s) != 2)
	{
		if ((fd = csopen(t = "/dev/fdp", 0)) >= 0) close(fd);
		else t = "/dev/tcp";
		sfsprintf(s = state.buf, state.buflen, "%s/local/%s/user", t, CO_ID);
	}
	if (!strmatch(s, "/dev/fdp/*")) state.indirect.con = 1;
	error(-1, "connect stream is %s", s);
	if (!(state.service = strdup(s)))
		error(3, "out of space [service]");

	/*
	 * check for alternate stdin
	 */

	if ((s = *argv) && strneq(s, "/dev/fd/", 8))
	{
		argv++;
		if (i = (int)strtol(s + 8, NiL, 0))
		{
			close(0);
			if (dup(i))
				error(ERROR_SYSTEM|3, "%s: cannot open", s);
		}
	}

	/*
	 * COMMAND	interactive or argv command processor to server (-*)
	 * DEFER	pass fds to the server and exit (no args or /dev/fd/n)
	 * SERVER	the server (+*)
	 */

	if ((s = strrchr(*opt_info.argv, '.')) && streq(s + 1, CS_SVC_SUFFIX))
	{
		n = SERVER;
		argv--;
	}
	else if (!(s = *argv)) n = DEFER;
	else if (s[0] == '+' && !s[1]) n = SERVER;
	else if (s[0] == '-' && s[1] != '?') n = COMMAND;
	else error(ERROR_USAGE|4, "[connect-stream] [-hjqQs[aelpst]] [-r host [cmd]] | + [info]");
	if (n != SERVER)
	{
		if ((fd = csopen(state.service, CS_OPEN_TEST)) < 0)
		{
			if (errno == ENOENT) error(3, "%s: server not running", state.service);
			else error(ERROR_SYSTEM|3, "%s: cannot open connect stream", state.service);
		}
		if (!state.indirect.con)
		{
			for (i = 0; i < elementsof(fds); i++)
				fds[i] = i;
			if (n == COMMAND)
				i--;
			else if (!(s = getenv(CO_ENV_MSGFD)) || (state.indirect.msg = (int)strtol(s, &t, 0)) <= 0 || *t)
				state.indirect.msg = i - 1;
			fds[i - 1] = state.indirect.msg;
		}
		errno = EINVAL;
		s = state.buf;
		if ((state.indirect.con || !cssend(fd, fds, i)) && csread(fd, s, 7, CS_EXACT) == 7 && s[0] == '#' && !(errno = (int)strtol(s + 1, NiL, 10))) do
		{
			if (state.indirect.con)
			{
				if ((pfd = csopen(t = "/dev/tcp/local/normal/slave", CS_OPEN_CREATE)) < 0)
					error(ERROR_SYSTEM|3, "%s: cannot create pump connect stream", t);
			}
			else if (n != COMMAND) exit(0);
			s += 7;
			s += sfsprintf(s, state.buflen - (s - state.buf), "%s=label=%s", CO_ENV_ATTRIBUTES, CO_ID);
			if ((t = getenv(CO_ENV_ATTRIBUTES)) && *t) s += sfsprintf(s, state.buflen - (s - state.buf), ",%s", t);
			if (state.indirect.con)
			{
				struct stat	st;
				struct stat	ts;

				s += sfsprintf(s, state.buflen - (s - state.buf), "\n%s=%s", CO_OPT_INDIRECT, cspath(pfd, 0));
				s += sfsprintf(s, state.buflen - (s - state.buf), "\n%s=%s", CO_OPT_HOME, csname(0));
				if (!fstat(1, &st) && !fstat(2, &ts) && st.st_ino == ts.st_ino && st.st_dev == ts.st_dev)
				{
					s += sfsprintf(s, state.buflen - (s - state.buf), "\n%s", CO_OPT_DUP);
					d = 1;
				}
				else d = 2;
				if (n == COMMAND) s += sfsprintf(s, state.buflen - (s - state.buf), "\n%s", CO_OPT_COMMAND);
			}
			s += sfsprintf(s, state.buflen - (s - state.buf), "\n");
			i = s - state.buf;
			s = state.buf;
			sfsprintf(s, 7, "#%05d\n", i - 7);
			if (cswrite(fd, s, i) != i) break;
			if (n == COMMAND)
			{
				if (state.indirect.con)
				{
					for (; d > 0 && csrecv(pfd, NiL, fds, 1) == 1 && csread(fds[0], s, 7, CS_EXACT) == 7 && s[0] == '#'; d--)
						if ((n = (int)strtol(s + 1, NiL, 10)) == 1) state.indirect.out = fds[0];
						else if (n == 2) state.indirect.err = fds[0];
						else break;
					if (d) break;
					state.indirect.con = pfd;
				}
				exit(command(fd, (*argv)[1] ? argv : (char**)0));
			}
			if ((state.indirect.cmd = dup(0)) < 0) break;
			close(0);
			if (dup(pfd)) break;
			close(pfd);
			csfd(state.indirect.cmd, 0);
			csfd(state.indirect.con = fd, 0);
			csfd(state.indirect.out = 1, 0);
			csfd(state.indirect.err = 2, 0);
			csfd(state.indirect.msg, 0);
			csserve(NiL, NiL, indirect, NiL, NiL, pump, NiL, NiL);
		} while (0);
		error(ERROR_SYSTEM|3, "%s: cannot connect to server", state.service);
	}
	state.argv = argv + 1;

	/*
	 * we are the server
	 */

	csserve(NiL, state.service, init, NiL, user, service, NiL, wakeup);
	/*NOTREACHED*/
	return 1;
}
